import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

class ReplaySubjectTestPage extends TestPage {
  ReplaySubjectTestPage(super.title) {
    group('ReplaySubject', () {
      test('replays the previously emitted items to every subscriber', () async {
        final subject = ReplaySubject<int>();

        subject.add(1);
        subject.add(2);
        subject.add(3);

        expect(subject.stream.values);
      });

      test(
          'replays the previously emitted items to every subscriber, includes null',
              () async {
            final subject = ReplaySubject<int?>();

            subject.add(null);
            subject.add(1);
            subject.add(2);
            subject.add(3);
            subject.add(null);

            expect(subject.stream.values);
            expect(subject.stream.values);
            expect(subject.stream.values);
          });

      test('replays the previously emitted errors to every subscriber', () async {
        final subject = ReplaySubject<int>();

        subject.addError(Exception());
        subject.addError(Exception());
        subject.addError(Exception());
        expect(subject.stream.errors);
        expect(subject.stream.errors);
        expect(subject.stream.errors);
      });

      test(
          'replays the previously emitted items to every subscriber that directly subscribes to the Subject',
              () async {
            final subject = ReplaySubject<int>();

            subject.add(1);
            subject.add(2);
            subject.add(3);

            expect(subject.values);
            expect(subject.values);
            expect(subject.values);
          });

      test(
          'replays the previously emitted items and errors to every subscriber that directly subscribes to the Subject',
              () async {
            final subject = ReplaySubject<int>();

            subject.add(1);
            subject.addError(Exception());
            subject.addError(Exception());
            subject.add(2);

            expect(subject.values);
            expect(subject.values);
            expect(subject.values);
          });

      test('synchronously get the previous items', () async {
        final subject = ReplaySubject<int>();

        subject.add(1);
        subject.add(2);
        subject.add(3);

        expect(subject.values);
      });

      test('synchronously get the previous errors', () {
        final subject = ReplaySubject<int>();
        final e1 = Exception(), e2 = Exception(), e3 = Exception();
        final stackTrace = StackTrace.fromString('#');

        subject.addError(e1);
        subject.addError(e2, stackTrace);
        subject.addError(e3);

        expect(subject.errors);
        expect(subject.stackTraces);
      });

      test('replays the most recently emitted items up to a max size', () async {
        final subject = ReplaySubject<int>(maxSize: 2);

        subject.add(1); // Should be dropped
        subject.add(2);
        subject.add(3);

        expect(subject.stream.values);
        expect(subject.stream.values);
        expect(subject.stream.values);
      });

      test('emits done event to listeners when the subject is closed', () async {
        final subject = ReplaySubject<int>();

        expect(subject.isClosed);

        subject.add(1);
        scheduleMicrotask(() => subject.close());

        expect(subject.stream.values);
        expect(subject.isClosed);
      });

      test('emits error events to subscribers', () async {
        final subject = ReplaySubject<int>();

        scheduleMicrotask(() => subject.addError(Exception()));

        expect(subject.stream.values);
      });

      test('replays the previously emitted items from addStream', () async {
        final subject = ReplaySubject<int>();

        await subject.addStream(Stream<int>.fromIterable(const [1, 2, 3]));

        expect(subject.stream.values);
        expect(subject.stream.values);
        expect(subject.stream.values);
      });

      test('allows items to be added once addStream is complete', () async {
        final subject = ReplaySubject<int>();

        await subject.addStream(Stream.fromIterable(const [1, 2]));
        subject.add(3);

        expect(subject.stream.values);
      });

      test('allows items to be added once addStream completes with an error',
              () async {
            final subject = ReplaySubject<int>();

            unawaited(subject
                .addStream(Stream<int>.error(Exception()), cancelOnError: true)
                .whenComplete(() => subject.add(1)));

            expect(subject.stream.values);
          });

      test('does not allow events to be added when addStream is active',
              () async {
            final subject = ReplaySubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.add(1));
          });

      test('does not allow errors to be added when addStream is active',
              () async {
            final subject = ReplaySubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.addError(Error()));
          });

      test('does not allow subject to be closed when addStream is active',
              () async {
            final subject = ReplaySubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.close());
          });

      test(
          'does not allow addStream to add items when previous addStream is active',
              () async {
            final subject = ReplaySubject<int>();

            // Purposely don't wait for the future to complete, then try to add items
            // ignore: unawaited_futures
            subject.addStream(Stream.fromIterable(const [1, 2, 3]));

            expect(() => subject.addStream(Stream.fromIterable(const [1])));
          });

      test('returns onListen callback set in constructor', () async {
        void testOnListen() {}

        final subject = ReplaySubject<int>(onListen: testOnListen);

        expect('${subject.onListen}, $testOnListen');
      });

      test('sets onListen callback', () async {
        void testOnListen() {}

        final subject = ReplaySubject<int>();

        expect(subject.onListen);

        subject.onListen = testOnListen;

        expect('${subject.onListen}, $testOnListen');
      });

      test('returns onCancel callback set in constructor', () async {
        Future<void> onCancel() => Future<void>.value(null);

        final subject = ReplaySubject<void>(onCancel: onCancel);

        expect('${subject.onCancel}, $onCancel');
      });

      test('sets onCancel callback', () async {
        void testOnCancel() {}

        final subject = ReplaySubject<void>();

        expect(subject.onCancel);

        subject.onCancel = testOnCancel;

        expect('${subject.onCancel}, $testOnCancel');
      });

      test('reports if a listener is present', () async {
        final subject = ReplaySubject<void>();

        expect(subject.hasListener);

        subject.stream.listen(null);

        expect(subject.hasListener);
      });

      test('onPause unsupported', () {
        final subject = ReplaySubject<void>();

        expect(subject.isPaused);
        expect(() => subject.onPause);
        expect(() => subject.onPause = () {});
      });

      test('onResume unsupported', () {
        final subject = ReplaySubject<void>();

        expect(() => subject.onResume);
        expect(() => subject.onResume = () {});
      });

      test('returns controller sink', () async {
        final subject = ReplaySubject<int>();

        expect(subject.sink);
      });

      test('correctly closes done Future', () async {
        final subject = ReplaySubject<int>();

        scheduleMicrotask(subject.close);

        expect(subject.done,);
      });

      test('can be listened to multiple times', () async {
        final subject = ReplaySubject<int>();
        final stream = subject.stream.values;

        subject.add(1);
        subject.add(2);

        expect(stream);
        expect(stream);
      });

      test('always returns the same stream', () async {
        final subject = ReplaySubject<int>();

        expect(subject.stream.values);
      });

      test('adding to sink has same behavior as adding to Subject itself',
              () async {
            final subject = ReplaySubject<int>();

            subject.sink.add(1);
            subject.sink.add(2);
            subject.sink.add(3);

            expect(subject.stream.values);
            expect(subject.stream.values);
            expect(subject.stream.values);
          });

      test('is always treated as a broadcast Stream', () async {
        final subject = ReplaySubject<int>();
        final stream = subject.asyncMap((event) => Future.value(event));

        expect(subject.isBroadcast);
        expect(stream.isBroadcast);
      });

      test('issue/419: sync behavior', () async {
        final subject = ReplaySubject<int>(sync: true)..add(1);
        final mappedStream = subject.map((event) => event).shareValue();

        mappedStream.listen(null);

        expect(mappedStream.valueOrNull);

        await subject.close();
      });

      test('issue/419: sync throughput', () async {
        final subject = ReplaySubject<int>(sync: true)..add(1);
        final mappedStream = subject.map((event) => event).shareValue();

        mappedStream.listen(null);

        subject.add(2);

        expect(mappedStream.valueOrNull);

        await subject.close();
      });

      test('issue/419: async behavior', () async {
        final subject = ReplaySubject<int>()..add(1);
        final mappedStream = subject.map((event) => event).shareValue();

        mappedStream.listen(null,
            onDone: () => expect(mappedStream.value));

        expect(mappedStream.valueOrNull);

        await subject.close();
      });

      test('issue/419: async throughput', () async {
        final subject = ReplaySubject<int>()..add(1);
        final mappedStream = subject.map((event) => event).shareValue();

        mappedStream.listen(null,
            onDone: () => expect(mappedStream.value));

        subject.add(2);

        expect(mappedStream.valueOrNull);

        await subject.close();
      });

      test('do not update buffer after closed', () {
        final subject = ReplaySubject<int>();

        subject.add(1);
        expect(subject.values);

        subject.close();

        expect(() => subject.add(2));
        expect(() => subject.addError(Exception()));
        expect(subject.values);
      });

      test('stream returns a read-only stream', () async {
        final subject = ReplaySubject<int>()..add(1);

        expect(subject.stream.values);
        expect(subject.stream.values);

        expect(subject.stream.values,);

        {
          final stream = subject.stream;
          expect(stream.isBroadcast);
          expect(stream);
          expect(stream);
        }

        expect(identical(subject.stream.values, subject.stream.values));
        expect(subject.stream.values == subject.stream.values);
      });
    });
  }

}